﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Bridge.Workflow.Domain;
using Bridge.Workflow.Extension;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.Domain.Services;
using Volo.Abp.Guids;
using Volo.Abp.Linq;
using Volo.Abp.Uow;
using Volo.Abp.Users;
using WorkflowCore.Models;

namespace Bridge.Workflow.EntityFrameworkCore.Persistence
{
    /// <summary>
    /// 
    /// </summary>
    public class AbpPersistenceProvider : DomainService, IAbpPersistenceProvider
    {
        private const string ActionName = "AuditEvent";
        private readonly IRepository<PersistedEvent, Guid> _eventRepository;
        private readonly IRepository<PersistedExecutionPointer, string> _executionPointerRepository;
        private readonly IRepository<PersistedWorkflow, string> _workflowRepository;
        private readonly IRepository<PersistedWorkflowDefinition, string> _workflowDefinitionRepository;
        private readonly IRepository<PersistedSubscription, Guid> _eventSubscriptionRepository;
        private readonly IRepository<PersistedExecutionError, Guid> _executionErrorRepository;
        private readonly IGuidGenerator _guidGenerator;
        private readonly IAsyncQueryableExecuter _asyncQueryableExecuter;
        private readonly ICurrentUser _currentUser;
        private readonly IUnitOfWork _unitOfWork;
        private readonly IRepository<PersistedWorkflowAuditor, string> _auditorRepository;


        public AbpPersistenceProvider(
            IRepository<PersistedEvent, Guid> eventRepository,
            IRepository<PersistedExecutionPointer, string> executionPointerRepository,
            IRepository<PersistedWorkflow, string> workflowRepository,
            IRepository<PersistedSubscription, Guid> eventSubscriptionRepository,
            IGuidGenerator guidGenerator,
            IRepository<PersistedExecutionError, Guid> executionErrorRepository,
            IRepository<PersistedWorkflowDefinition, string> workflowDefinitionRepository,
            IAsyncQueryableExecuter asyncQueryableExecuter,
            ICurrentUser currentUser,
            IUnitOfWork unitOfWork,
            IRepository<PersistedWorkflowAuditor, string> auditorRepository)
        {
            this._eventRepository = eventRepository;
            this._executionPointerRepository = executionPointerRepository;
            this._workflowRepository = workflowRepository;
            this._eventSubscriptionRepository = eventSubscriptionRepository;
            this._guidGenerator = guidGenerator;
            this._executionErrorRepository = executionErrorRepository;
            this._workflowDefinitionRepository = workflowDefinitionRepository;
            this._asyncQueryableExecuter = asyncQueryableExecuter;
            this._currentUser = currentUser;
            this._unitOfWork = unitOfWork;
            this._auditorRepository = auditorRepository;
        }

        /// <summary>
        /// 
        /// </summary>
        public bool SupportsScheduledCommands { get; set; }


       

        /// <summary>
        /// 
        /// </summary>
        /// <param name="newEvent"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default)
        {
            newEvent.Id = _guidGenerator.Create().ToString();
            var persistable = newEvent.ToPersistable();
            await _eventRepository.InsertAsync(persistable, false, cancellationToken);
            await _unitOfWork.SaveChangesAsync(cancellationToken);

            return newEvent.Id;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="subscription"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default)
        {
            subscription.Id = _guidGenerator.Create().ToString();
            var persistable = subscription.ToPersistable();
            await _eventSubscriptionRepository.InsertAsync(persistable, false, cancellationToken);
            await _unitOfWork.SaveChangesAsync(cancellationToken);

            return subscription.Id;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="workflow"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
        {
            workflow.CreateTime = DateTime.Now;
            workflow.Id = _guidGenerator.Create().ToString();
            var persistable = workflow.ToPersistable();
            if (_currentUser.Id.HasValue)
            {
                //var userCache = AbpSession.GetCurrentUser();
                persistable.CreateUserIdentityName = _currentUser.Name;
                persistable.LastModifierId = _currentUser.Id;
            }
            await _workflowRepository.InsertAsync(persistable, false, cancellationToken);
            await _unitOfWork.SaveChangesAsync(cancellationToken);

            return workflow.Id;
        }

        /// <summary>
        /// 
        /// </summary>
        [UnitOfWork]
        public virtual void EnsureStoreExists()
        {
            Console.WriteLine("工作流初始化完成！");
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="definitionId"></param>
        /// <param name="version"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<IEnumerable<PersistedWorkflow>> GetAllRunnablePersistedWorkflow(string definitionId, int version)
        {
            return await _workflowRepository.GetListAsync(u => u.WorkflowDefinitionId == definitionId && u.Version == version);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="id"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = default)
        {
            Guid uid = Guid.Parse(id);
            var raw = await _eventRepository.FirstOrDefaultAsync(x => x.Id == uid, cancellationToken);
            return raw.ToEvent();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventName"></param>
        /// <param name="eventKey"></param>
        /// <param name="asOf"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default)
        {
            var raw = await (await _eventRepository.GetQueryableAsync())
                   .Where(x => x.EventName == eventName && x.EventKey == eventKey)
                   .Where(x => x.EventTime >= asOf)
                   .Select(x => x.Id)
                   .ToListAsync(cancellationToken);

            return raw.Select(x => x.ToString());
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventName"></param>
        /// <param name="eventKey"></param>
        /// <param name="asOf"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default)
        {
            var raw = await _eventSubscriptionRepository
                .FirstOrDefaultAsync(x => x.EventName == eventName
                && x.EventKey == eventKey
                && x.SubscribeAsOf <= asOf && x.ExternalToken == null, cancellationToken);

            return raw?.ToEventSubscription();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<PersistedExecutionPointer> GetPersistedExecutionPointer(string id)
        {
            return await _executionPointerRepository.GetAsync(id);
        }

        [UnitOfWork]
        public virtual async Task<PersistedWorkflow> GetPersistedWorkflow(string id)
        {
            return await _workflowRepository.GetAsync(id);
        }

        [UnitOfWork]
        public virtual async Task<PersistedWorkflowDefinition> GetPersistedWorkflowDefinition(string id, int version)
        {
            return await _workflowDefinitionRepository.FirstOrDefaultAsync(u => u.Id == id && u.Version == version);
        }

        [UnitOfWork]
        public virtual async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default)
        {
            var now = asAt.ToUniversalTime();

            asAt = asAt.ToUniversalTime();
            var raw = (await _eventRepository.GetListAsync(x => !x.IsProcessed&& x.EventTime <= now)).Select(x => x.ToString()).ToList();

            return raw;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="asAt"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
        {
            var now = asAt.ToUniversalTime().Ticks;

            var query = (await _workflowRepository.GetQueryableAsync())
                .Where(x => x.NextExecution.HasValue &&
                x.NextExecution <= now &&
                x.Status == WorkflowStatus.Runnable)
                .Select(x => x.Id);

            return await _asyncQueryableExecuter.ToListAsync(query, cancellationToken);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventSubscriptionId"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
        {
            var uid = Guid.Parse(eventSubscriptionId);
            var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid, cancellationToken);

            return raw?.ToEventSubscription();
        }


       

        /// <summary>
        /// 
        /// </summary>
        /// <param name="status"></param>
        /// <param name="type"></param>
        /// <param name="createdFrom"></param>
        /// <param name="createdTo"></param>
        /// <param name="skip"></param>
        /// <param name="take"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
        {
            IQueryable<PersistedWorkflow> query = (await _workflowRepository.GetQueryableAsync())
                .Include(wf => wf.ExecutionPointers)
                .ThenInclude(ep => ep.ExtensionAttributes)
                .Include(wf => wf.ExecutionPointers)
                .AsQueryable();

            if (status.HasValue)
                query = query.Where(x => x.Status == status.Value);

            if (!string.IsNullOrEmpty(type))
                query = query.Where(x => x.WorkflowDefinitionId == type);

            if (createdFrom.HasValue)
                query = query.Where(x => x.CreationTime >= createdFrom.Value);

            if (createdTo.HasValue)
                query = query.Where(x => x.CreationTime <= createdTo.Value);

            var rawResult = await query.Skip(skip).Take(take).ToListAsync();

            return rawResult.Select(x => x.ToWorkflowInstance()).ToList();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="ids"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = default)
        {
            if (ids == null)
            {
                return new List<WorkflowInstance>();
            }

            var uids = ids.Select(i => i);
            var raw = (await _workflowRepository.GetQueryableAsync())
                .Include(wf => wf.ExecutionPointers)
                .ThenInclude(ep => ep.ExtensionAttributes)
                .Include(wf => wf.ExecutionPointers)
                .Where(x => uids.Contains(x.Id));

            return (await raw.ToListAsync(cancellationToken)).Select(i => i.ToWorkflowInstance());
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="id"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
        {
            var uid = Guid.Parse(id);
            var existingEntity = await _eventRepository.FirstOrDefaultAsync(a => a.Id == uid);

            existingEntity.IsProcessed = true;
            //await _eventRepository.UpdateAsync(existingEntity, false, cancellationToken);
            await _unitOfWork.SaveChangesAsync(cancellationToken);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="id"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
        {
            var uid = Guid.Parse(id);
            var existingEntity = await _eventRepository.FirstOrDefaultAsync(x => x.Id == uid);
            existingEntity.IsProcessed = false;
            await  _eventRepository.UpdateAsync(existingEntity);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="errors"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default)
        {
            var executionErrors = errors as ExecutionError[] ?? errors.ToArray();
            if (!executionErrors.Any()) return;
            foreach (var error in executionErrors)
            {
                await _executionErrorRepository.InsertAsync(error.ToPersistable(), false, cancellationToken);
            }

            await _unitOfWork.SaveChangesAsync(cancellationToken);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="workflow"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
        {
            var existingEntity = await (await _workflowRepository.GetQueryableAsync())
                .Where(x => x.Id == workflow.Id)
                .Include(wf => wf.ExecutionPointers)
                .ThenInclude(ep => ep.ExtensionAttributes)
                .Include(wf => wf.ExecutionPointers)
                .AsTracking()
                .FirstAsync(cancellationToken);
            var persistable = workflow.ToPersistable(existingEntity);

            await _unitOfWork.SaveChangesAsync(cancellationToken);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="asOf"></param>
        /// <param name="action"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="command"></param>
        /// <returns></returns>
        [UnitOfWork]
        public Task ScheduleCommand(ScheduledCommand command)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventSubscriptionId"></param>
        /// <param name="token"></param>
        /// <param name="workerId"></param>
        /// <param name="expiry"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
        {
            var uid = Guid.Parse(eventSubscriptionId);
            var existingEntity = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid);

            existingEntity.ExternalToken = token;
            existingEntity.ExternalWorkerId = workerId;
            existingEntity.ExternalTokenExpiry = expiry;

            await _eventSubscriptionRepository.UpdateAsync(existingEntity);

            return true;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventSubscriptionId"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
        {
            var uid = Guid.Parse(eventSubscriptionId);
            var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid, cancellationToken);
            await _eventSubscriptionRepository.DeleteAsync(existing, false, cancellationToken);
            await _unitOfWork.SaveChangesAsync(cancellationToken);
        }

        /// <summary>
        /// 写入审核记录
        /// </summary>
        /// <param name="insertModel"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<PersistedWorkflowAuditor> InsertAuditor(PersistedWorkflowAuditor insertModel)
        {
            return await _auditorRepository.InsertAsync(insertModel, true);
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="insertModel"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<bool> AuditorAnyAsync(string id, Guid userId, EnumAuditStatus auditStatus)
        {
            return await _auditorRepository.AnyAsync(u => u.ExecutionPointerId == id && u.UserId == userId && u.Status == auditStatus);
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="insertModel"></param>
        /// <returns></returns>
        [UnitOfWork]
        public virtual async Task<int> AuditorCountAsync(string id, EnumAuditStatus auditStatus)
        {
            return await _auditorRepository.CountAsync(u => u.ExecutionPointerId == id && u.Status == auditStatus);
        }

        public Task<WorkflowInstance> GetWorkflowInstance(string Id, CancellationToken cancellationToken = default)
        {
            throw new NotImplementedException();
        }

        public Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default)
        {
            throw new NotImplementedException();
        }

        public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
        {
            throw new NotImplementedException();
        }
    }
}
